#include "insert.hpp"

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>

#include "all_type_variant.hpp"
#include "concurrency/transaction_context.hpp"
#include "hyrise.hpp"
#include "operators/abstract_operator.hpp"
#include "resolve_type.hpp"
#include "storage/abstract_segment.hpp"
#include "storage/mvcc_data.hpp"
#include "storage/segment_iterate.hpp"
#include "storage/value_segment.hpp"
#include "tasks/chunk_compression_task.hpp"
#include "types.hpp"
#include "utils/assert.hpp"
#include "utils/atomic_max.hpp"

namespace {

using namespace hyrise;  // NOLINT(build/namespaces)

template <typename T>
void copy_value_range(const std::shared_ptr<const AbstractSegment>& source_abstract_segment,
                      ChunkOffset source_begin_offset, const std::shared_ptr<AbstractSegment>& target_abstract_segment,
                      ChunkOffset target_begin_offset, ChunkOffset length) {
  DebugAssert(source_abstract_segment->size() >= source_begin_offset + length, "Source Segment out-of-bounds.");
  DebugAssert(target_abstract_segment->size() >= target_begin_offset + length, "Target Segment out-of-bounds.");

  const auto target_value_segment = std::dynamic_pointer_cast<ValueSegment<T>>(target_abstract_segment);
  Assert(target_value_segment, "Cannot insert into non-ValueSegments.");

  auto& target_values = target_value_segment->values();

  /**
   * If the source Segment is a ValueSegment, take a fast path to copy the data. Otherwise, take a (potentially slower)
   * fallback path.
   */
  if (const auto source_value_segment = std::dynamic_pointer_cast<const ValueSegment<T>>(source_abstract_segment)) {
    std::copy_n(source_value_segment->values().begin() + source_begin_offset, length,
                target_values.begin() + target_begin_offset);

    if (source_value_segment->is_nullable()) {
      const auto nulls_begin_iter = source_value_segment->null_values().begin() + source_begin_offset;
      const auto nulls_end_iter = nulls_begin_iter + length;

      auto nulls_target_offset = target_begin_offset;
      for (auto nulls_iter = nulls_begin_iter; nulls_iter != nulls_end_iter; ++nulls_iter) {
        if (*nulls_iter) {
          target_value_segment->set_null_value(nulls_target_offset);
        }
        ++nulls_target_offset;
      }
    }
  } else {
    segment_with_iterators<T>(*source_abstract_segment, [&](const auto& source_begin, const auto& /*source_end*/) {
      auto source_iter = source_begin + source_begin_offset;
      auto target_iter = target_values.begin() + target_begin_offset;

      // Copy values and NULL values.
      for (auto index = ChunkOffset{0}; index < length; ++index) {
        *target_iter = source_iter->value();

        if (source_iter->is_null()) {
          // ValueSegments not being NULLable will be handled over there.
          target_value_segment->set_null_value(ChunkOffset{target_begin_offset + index});
        }

        ++source_iter;
        ++target_iter;
      }
    });
  }
}

void deregister_insert(const std::shared_ptr<Table>& table, const ChunkID chunk_id, const std::shared_ptr<Chunk>& chunk,
                       const std::shared_ptr<MvccData>& mvcc_data) {
  // Deregister the pending Insert and try to mark the chunk as immutable. We might be the last committing/rolling back
  // Insert operator inserting into a chunk that reached its target size and. In this case, the Insert operator that
  // added a new chunk to the table allowed the chunk to be marked, i.e., it set the `reached_target_size` flag. Then,
  // `try_set_immutable()` actually marks the chunk. Otherwise, this is a no-op.
  const auto active_inserts = mvcc_data->deregister_insert();
  if (active_inserts == 0 && chunk->try_set_immutable()) {
    // We were the first ones to mark the chunk as immutable. Thus, we have to take care the chunk is encoded. We only
    // spin off a background task, so the current transaction does not have to wait for the encoding to complete.
    std::make_shared<ChunkCompressionTask>(table, chunk_id)->schedule();
  }
}

}  // namespace

namespace hyrise {

Insert::Insert(const std::string& target_table_name, const std::shared_ptr<const AbstractOperator>& values_to_insert)
    : AbstractReadWriteOperator(OperatorType::Insert, values_to_insert), _target_table_name(target_table_name) {}

const std::string& Insert::name() const {
  static const auto name = std::string{"Insert"};
  return name;
}

std::shared_ptr<const Table> Insert::_on_execute(std::shared_ptr<TransactionContext> context) {
  _target_table = Hyrise::get().storage_manager.get_table(_target_table_name);

  Assert(_target_table->target_chunk_size() > 0, "Expected target chunk size of target table to be greater than zero.");
  for (auto column_id = ColumnID{0}; column_id < _target_table->column_count(); ++column_id) {
    // This is not really a strong limitation, we just did not want the compile time of all type combinations. If you
    // really want this, it should be only a couple of lines to implement.
    Assert(left_input_table()->column_data_type(column_id) == _target_table->column_data_type(column_id),
           "Cannot handle inserts into column of different type.");
  }

  /**
   * 1. Allocate the required rows in the target Table, without actually copying data to them.  Do so while locking the
   *    table to prevent multiple threads modifying the table's size simultaneously. Since allocation is expected to be
   *    faster than writing to the memory, allocating under lock and then writing - in a second step - without lock will
   *    minimize the time that the Table's `_append_mutex` is locked.
   */
  {
    const auto append_lock = _target_table->acquire_append_mutex();

    auto remaining_rows = left_input_table()->row_count();

    if (_target_table->chunk_count() == 0) {
      _target_table->append_mutable_chunk();
    }
    while (remaining_rows > 0) {
      auto target_chunk_id = ChunkID{_target_table->chunk_count() - 1};
      auto target_chunk = _target_table->get_chunk(target_chunk_id);
      const auto target_size = _target_table->target_chunk_size();

      // If the last chunk of the target table is either immutable or full, append a new mutable chunk.
      if (!target_chunk->is_mutable() || target_chunk->size() == target_size) {
        _target_table->append_mutable_chunk();
        ++target_chunk_id;
        target_chunk = _target_table->get_chunk(target_chunk_id);
      }

      // Register that Insert is pending. See `chunk.hpp`. for details.
      const auto& mvcc_data = target_chunk->mvcc_data();
      DebugAssert(mvcc_data, "Insert cannot operate on a table without MVCC data.");
      mvcc_data->register_insert();

      const auto num_rows_for_target_chunk =
          std::min<size_t>(_target_table->target_chunk_size() - target_chunk->size(), remaining_rows);

      _target_chunk_ranges.emplace_back(
          ChunkRange{.chunk_id = target_chunk_id,
                     .begin_chunk_offset = target_chunk->size(),
                     .end_chunk_offset = static_cast<ChunkOffset>(target_chunk->size() + num_rows_for_target_chunk)});

      // Mark new (but still empty) rows as being under modification by current transaction. Do so before resizing the
      // Segments, because the resize of `Chunk::_segments.front()` is what releases the new row count.
      {
        const auto transaction_id = context->transaction_id();
        const auto end_offset = target_chunk->size() + num_rows_for_target_chunk;
        for (auto target_chunk_offset = target_chunk->size(); target_chunk_offset < end_offset; ++target_chunk_offset) {
          DebugAssert(mvcc_data->get_begin_cid(target_chunk_offset) == MAX_COMMIT_ID, "Invalid begin CID.");
          DebugAssert(mvcc_data->get_end_cid(target_chunk_offset) == MAX_COMMIT_ID, "Invalid end CID.");
          mvcc_data->set_tid(target_chunk_offset, transaction_id, std::memory_order_relaxed);
        }
      }

      // Make sure the MVCC data is written before the first segment (and, thus, the chunk) is resized.
      std::atomic_thread_fence(std::memory_order_seq_cst);

      // "Grow" data segments: Segments are pre-allocated during construction. We resize() to make default-constructed
      // cells visible so that they can be written in the next Insert step. Note that those cells are still invisible
      // from an MVCC point of view until they are actually being written and committed.
      // Do so in REVERSE column order so that the resize of `Chunk::_segments.front()` happens last. It is this last
      // resize that makes the new row count visible to the outside world.
      const auto old_size = target_chunk->size();
      const auto new_size = old_size + num_rows_for_target_chunk;
      const auto column_count = target_chunk->column_count();
      for (auto reverse_column_id = ColumnID{0}; reverse_column_id < column_count; ++reverse_column_id) {
        const auto column_id = static_cast<ColumnID>(column_count - reverse_column_id - 1);

        resolve_data_type(_target_table->column_data_type(column_id), [&](const auto data_type_t) {
          using ColumnDataType = typename decltype(data_type_t)::type;

          const auto value_segment =
              std::dynamic_pointer_cast<ValueSegment<ColumnDataType>>(target_chunk->get_segment(column_id));
          Assert(value_segment, "Cannot insert into non-ValueSegments.");

          // Cannot guarantee resize without reallocation when growing. We thus check that the ValueSegment has been
          // allocated with the target table's target chunk size reserved.
          Assert(value_segment->values().capacity() >= new_size, "ValueSegment insufficiently pre-allocated.");
          value_segment->resize(new_size);
        });

        // Make sure the first column's resize actually happens last and does not get reordered.
        std::atomic_thread_fence(std::memory_order_seq_cst);
      }

      if (new_size == target_size) {
        // Allow the chunk to be marked as immutable as it has reached its target size and no incoming Insert operators
        // will try to write to it. Pending Insert operators (including us) will call `try_set_immutable()` to make the
        // chunk immutable once they commit/roll back.
        target_chunk->mark_as_full();
      }

      remaining_rows -= num_rows_for_target_chunk;
    }
  }

  /**
   * 2. Insert the Data into the memory allocated in the first step without holding a lock on the Table.
   */
  auto source_row_id = RowID{ChunkID{0}, ChunkOffset{0}};

  for (const auto& target_chunk_range : _target_chunk_ranges) {
    const auto target_chunk = _target_table->get_chunk(target_chunk_range.chunk_id);

    auto target_chunk_offset = target_chunk_range.begin_chunk_offset;
    auto target_chunk_range_remaining_rows =
        ChunkOffset{target_chunk_range.end_chunk_offset - target_chunk_range.begin_chunk_offset};

    while (target_chunk_range_remaining_rows > 0) {
      const auto source_chunk = left_input_table()->get_chunk(source_row_id.chunk_id);
      const auto source_chunk_remaining_rows = ChunkOffset{source_chunk->size() - source_row_id.chunk_offset};
      const auto num_rows_current_iteration =
          std::min<ChunkOffset>(source_chunk_remaining_rows, target_chunk_range_remaining_rows);

      // Copy from the source into the target Segments.
      const auto column_count = target_chunk->column_count();
      for (auto column_id = ColumnID{0}; column_id < column_count; ++column_id) {
        const auto& source_segment = source_chunk->get_segment(column_id);
        const auto& target_segment = target_chunk->get_segment(column_id);

        resolve_data_type(_target_table->column_data_type(column_id), [&](const auto data_type_t) {
          using ColumnDataType = typename decltype(data_type_t)::type;
          copy_value_range<ColumnDataType>(source_segment, source_row_id.chunk_offset, target_segment,
                                           target_chunk_offset, num_rows_current_iteration);
        });
      }

      if (num_rows_current_iteration == source_chunk_remaining_rows) {
        // Proceed to next source Chunk
        ++source_row_id.chunk_id;
        source_row_id.chunk_offset = 0;
      } else {
        source_row_id.chunk_offset += num_rows_current_iteration;
      }

      target_chunk_offset += num_rows_current_iteration;
      target_chunk_range_remaining_rows -= num_rows_current_iteration;
    }
  }

  return nullptr;
}

void Insert::_on_commit_records(const CommitID cid) {
  for (const auto& target_chunk_range : _target_chunk_ranges) {
    const auto target_chunk = _target_table->get_chunk(target_chunk_range.chunk_id);
    const auto& mvcc_data = target_chunk->mvcc_data();

    for (auto chunk_offset = target_chunk_range.begin_chunk_offset; chunk_offset < target_chunk_range.end_chunk_offset;
         ++chunk_offset) {
      // Set the begin CIDs to 0 (from the intial MAX_COMMIT_ID), making the rows visible, and unlock the rows by
      // setting the TIDs to 0 again.
      mvcc_data->set_begin_cid(chunk_offset, cid, std::memory_order_relaxed);
      mvcc_data->set_tid(chunk_offset, TransactionID{0}, std::memory_order_relaxed);
    }

    set_atomic_max(mvcc_data->max_begin_cid, cid);

    // This fence ensures that changes to the TIDs (which are not sequentially consistent) are visible to other threads.
    std::atomic_thread_fence(std::memory_order_release);
    deregister_insert(_target_table, target_chunk_range.chunk_id, target_chunk, mvcc_data);
  }
}

void Insert::_on_rollback_records() {
  for (const auto& target_chunk_range : _target_chunk_ranges) {
    const auto target_chunk = _target_table->get_chunk(target_chunk_range.chunk_id);
    const auto& mvcc_data = target_chunk->mvcc_data();

    for (auto chunk_offset = target_chunk_range.begin_chunk_offset; chunk_offset < target_chunk_range.end_chunk_offset;
         ++chunk_offset) {
      // Just unlock the rows by resetting the TID to 0. For other transactions, this row looks like it was never
      // inserted (which is basically true).
      mvcc_data->set_tid(chunk_offset, TransactionID{0}, std::memory_order_relaxed);
    }

    const auto record_count = target_chunk_range.end_chunk_offset - target_chunk_range.begin_chunk_offset;
    target_chunk->increase_invalid_row_count(ChunkOffset{record_count}, std::memory_order_release);
    deregister_insert(_target_table, target_chunk_range.chunk_id, target_chunk, mvcc_data);
  }
}

std::shared_ptr<AbstractOperator> Insert::_on_deep_copy(
    const std::shared_ptr<AbstractOperator>& copied_left_input,
    const std::shared_ptr<AbstractOperator>& /*copied_right_input*/,
    std::unordered_map<const AbstractOperator*, std::shared_ptr<AbstractOperator>>& /*copied_ops*/) const {
  return std::make_shared<Insert>(_target_table_name, copied_left_input);
}

void Insert::_on_set_parameters(const std::unordered_map<ParameterID, AllTypeVariant>& parameters) {}

}  // namespace hyrise
